#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"
#include "lich_qos.h"

#include "volume_ctl.h"
#include "volume_ctl_internal.h"

typedef struct {
        co_fork_hook_t fork_hook;

        volume_proto_t *volume_proto;
        nid_t srcnid;
        chkid_t chkid;
} chunk_sync_ctx_t;


STATIC void __chunk_sync_force(void *arg)
{
        int ret;
        chunk_sync_ctx_t *ctx = arg;
        volume_proto_t *volume_proto = ctx->volume_proto;

        // SCHEDULE_LEASE_SET();
        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_proto->chunk_set(volume_proto, &ctx->chkid, &ctx->srcnid, __S_CHECK);
        if (unlikely(ret)) {
                // bug #11420
                if (likely(ret == EBUSY)) {
                        goto out;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_BEGIN(0);

        ret = volume_proto->chunk_sync(volume_proto, &ctx->chkid, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, 1000 * 1000, "__chunk_sync_force");

out:
        co_fork_return(ctx->fork_hook.fork, ctx->fork_hook.idx, 0);
        return;
err_ret:
        co_fork_return(ctx->fork_hook.fork, ctx->fork_hook.idx, ret);
}

STATIC int __volume_ctl_chunk_sync_force__(volume_proto_t *volume_proto, const nid_t *srcnid,
                                           const chkid_t *chkids, int chknum, int *retval)
{
        int ret;
        co_fork_t *fork;
        chunk_sync_ctx_t *ctxs, *ctx;

        ret = co_fork_create(&fork, __FUNCTION__, chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&ctxs, sizeof(chunk_sync_ctx_t) * chknum);
        if (unlikely(ret))
                GOTO(err_fork, ret);

        memset(ctxs, 0, sizeof(chunk_sync_ctx_t) * chknum);

        for (int i=0; i < chknum; i++) {
                ctx = &ctxs[i];

                ctx->volume_proto = volume_proto;
                ctx->chkid = chkids[i];
                ctx->srcnid = *srcnid;

                co_fork_add(fork, __chunk_sync_force, ctx);
        }

        co_fork_join(fork);

        // retval
        for (int i=0; i < fork->capacity; i++) {
                retval[i] = fork->entries[i].retval;
        }

        yfree((void **)&ctxs);
        yfree((void **)&fork);
        return 0;
//err_ctx:
//        yfree((void **)&ctxs);
err_fork:
        yfree((void **)&fork);
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_sync_force(va_list ap)
{
        const nid_t *srcnid = va_arg(ap, nid_t *);
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        int chknum = va_arg(ap, int);
        int *retval = va_arg(ap, int *);
        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

#if 0
        ret = volume_proto_recovery_throt_req(volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);
#endif

        ANALYSIS_BEGIN(0);

        ret = __volume_ctl_chunk_sync_force__(volume_proto, srcnid, chkid, chknum, retval);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_END(0, 1000 * 1000, "__volume_ctl_chunk_sync_force");

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_sync_force(const nid_t *srcnid, const volid_t *volid,
                                const chkid_t *chkid, int chknum, int *retval)
{
        int ret;

        ret = core_request(core_hash(volid), SCHEDULE_PRIORITY1, "chunk_sync", __volume_ctl_chunk_sync_force,
                           srcnid, volid, chkid, chknum, retval);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_chunk_sync(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        int *force = va_arg(ap, int *);
        int *oflags = va_arg(ap, int *);
        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        (void) force;
#if 0
        if (*force == 0) {
                ret = volume_proto_recovery_throt_req(volume_proto);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }
#endif

        ANALYSIS_BEGIN(0);

        ret = volume_proto->chunk_check(volume_proto, chkid, oflags);
        if (unlikely(ret))
                GOTO(err_lock, ret);
        
        ret = volume_proto->chunk_sync(volume_proto, chkid, oflags);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ANALYSIS_QUEUE(0, 1000 * 1000, "__volume_ctl_chunk_sync");

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_sync(const volid_t *volid, const chkid_t *chkid, int force, int *oflags)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "chunk_sync", __volume_ctl_chunk_sync,
                           volid, chkid, &force, oflags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
